[PySpark]*.csvファイルを再帰的にヘッダーを除去しつつ読み込む
はじめに
Sparkはファイルシステムから直接ファイルを読むことができますが、ログとして出力されたファイルなどはいろいろなディレクトリに散らばっていることが多いです。 今回はPySparkでそれらのファイルを親ディレクトリから再帰的に走査して、目的のファイルだけをRDDに読み込んでみます。 ついでに、それらのファイルには先頭に1行ヘッダがあるという前提で、その除去も行います。
ファイルを読み込むメソッド
RDDにテキストファイルを読み込むメソッドは2つあります。
- textFile
- wholeTextFiles
textFile
は単独ファイル、wholeTextFiles
は複数ファイル、と思ってしまいがちですが、実際のところはそういう使い分けをすることはないです。
これらのメソッドの本質的な違いは、ファイルを読み込んだ後のRDDの形式が異なることです。
下のような2つのファイルがあるとします。
$ head aaa xxx ==> aaa <== aaa bbb ccc ==> xxx <== xxx yyy zzz
print sc.textFile("aaa,xxx").collect() ==> [u'aaa', u'bbb', u'ccc', u'xxx', u'yyy', u'zzz']
textFile
はファイルを読み込み、改行で区切った各要素を並べたRDDが作成されます。
print sc.wholeTextFile("aaa,xxx").collect() [(u'file:/Users/hirano.shigetoshi/Spark/spark-2.3.2-bin-hadoop2.7/aaa', u'aaa\nbbb\nccc\n'), (u'file:/Users/hirano.shigetoshi/Spark/spark-2.3.2-bin-hadoop2.7/xxx', u'xxx\nyyy\nzzz\n')]
wholeTextFiles
はファイルを読み込み、[<ファイルパス>, <ファイル内容全文>]
というtupleの集合としてRDDが作成されます。
指定できるパスについて
上記の例で、ファイルパスとしてaaa,xxx
と指定していますが、textFile
もwholeTextFiles
も、複数のパスをカンマ区切りで指定できます。
また、指定したパスがディレクトリであれば、その中の全てのファイルを読み込みますし、ある程度ワイルドカードを使うこともできます。1
このような仕様であるため、名前の印象とは裏腹に、この2つのメソッドを「どのファイルを読み込むか」という視点で使い分けも特に意味はないです。 どういう形式のRDDを作成するか、という視点で使い分けるのが良いようです。
wholeTextFilesを使って読み込み
では本題の読み込みを行いたいと思います。
2つの読み込みメソッドを紹介しましたが、今回使うのはwholeTextFiles
の方です。
使い方と合わせてその理由もご紹介します。
ファイル構成
以下のようにファイルが並んでいるとします。 今回はこの中のcsvだけが読み込み対象であるとします。
. ├── aaa │ ├── 000.txt │ ├── 000.csv │ ├── 001.txt │ └── 001.csv ├── bbb │ └── ccc │ ├── 003.txt │ └── 003.csv ・・・
PySparkのコード
まずヘッダを除去する関数を定義しておきます。
def cut_header(text): # 最初の1行を除外する pos = text.find("\n") if pos < 0: return "" else return text[pos+1]`
次にwholeTextFiles
で読み込み、ヘッダを除去して、改行区切りデータのRDDを作成します。
rdd = sc.wholeTextFiles("*/*.csv,*/*/*.csv")\ .map(lambda x: x[1])\ .map(lambda x: cut_header(x))\ .flatMap(lambda x: x.split("\n"))
csvファイルだけを指定するために、*/*.csv,*/*/*.csv
と指定しています。カッコ悪いですね。
しかし残念ながらこれよりも良い書き方は見つかりませんでした。
zshなどで使える **/*.csv
という書き方はできないようです。
データがある程度動的に吐き出されるものだとしても、ディレクトリ階層の深さが全く読めないという事態は滅多にないと思うので、現実的には、ある程度の深さまでのパスを書いておけば良いかと思います。
もっとスマートに書けるよ、という方は是非教えていただければと思います。
以下、ヘッダを除去してフラットなRDDを得る手順です。
map(lambda x: x[1])
- 今回はファイル名は不要なので、ファイルの中身(tupleの[1])だけにしています。
map(lambda x: cur_header(x))
x
には1ファイルの中身全てが(改行コード込みで)格納されていますので、各々をcut_header
で処理しています。
- flatMap(lambda x: x.split("\n"))`
- flatMapは、各処理から配列を受け取って、その要素を全て一つのRDD(配列)に入れる、という処理です。
- flatMapによって
wholeTextFiles
で読んだ後にtextFiles
で読む形式に変換しています。
wholeTextFiles
で読み込んだことにより、ファイルという単位のRDDレコードが作成されたので、その中で先頭行の除去を行っています。
これはtextFile
だとうまくいきません。
textFile
をやった時点ですでにflatMap
相当の処理が起きているので、rdd
には元のファイルの区切りという情報は残っていません。
ですので、「ファイルの何行目」などの操作が必要な場合にはwholeTextFiles
を使用します。
textFileとwholeTextFilesの使い分け
繰り返しになりますが、両者の使い分けはファイルの数などではなく、どうファイルを読み込むかです。
wholeTextFiles
はファイル名の情報も使えるし、ファイルという単位での整形ができますから、データの整形をしつつ読み込む場合にはこちらを使うのが良いです。
ただし、wholeTextFiles
はファイルの中身を1レコードとして全て読み込みますので、あまり大きなファイルを読むことは避けたいです。
その場合はtextFile
で読み込んで、ファイル単位出なくても整形できないかを考えた方が良さそうです。
元データを加工できるのであれば、ヘッダには必ず先頭に"#"をつけるなどの処置をするのがベストです。
そうすればflatになったデータでも先頭だけをみてヘッダを除去できます。
Sparkに向かない処理はNG
もちろん私の失敗談ですが、ファイルの指定の仕方にクセがあるので、それに触りたくないと思って最初は以下のようなコードを書いてしまいがちです。
rdd = sc.parallelize([]) for x in glob.glob("."): tmp_rdd = sc.textFile(x) rdd = rdd.union(tmp_rdd)
ファイルごとにRDDを作成して結合していく、という配列ではよくあるやり方です。 しかしこれはSpark的には全くパフォーマンスが出ないやり方なので、やってはいけません。 というか、ファイルの量が多くなるとStackOverflowでエラーになってしまうようでした。
再帰的に読み込む別解
2018-10-31追記
wholeTextFiles("*")
としても、再帰的に全てのファイルを読んでいる訳ではないようです。
wholeTextFiles("*")
とすると、"カレントディレクトリにあるディレクトリの直下のファイル"が全て読み込まれるようです。
`*.csv,*/*.csv,・・・` と書くのはやっぱり嫌なので、一応別解もあります。
`wholeTextFiles`に`*`を渡すと、再帰的にすべてのファイルが読み込まれるようです。
そしてファイル名でフィルターしてcsvにしています。
処理の仕方も"Spark的"ですし、一見完璧に見えるのですが、これだと全てのファイルを一度メモリ上に読み込む必要がありますので、目的のファイル以外にどんなファイルがあるのか確認してからやった方が良いでしょう。
大きいバイナリファイルなんかも問答無用で読み込んでしまいます。
逆に、ある程度整備されていて小さいファイルだけであれば、この方法の方がスマートであるのは確かです。
状況に応じて使い分けましょう。
まとめ
PySparkで散らばったログファイルなどを読み込む際のノウハウについてご紹介しました。 ファイルパスの指定の仕方が厄介なので、そこさえクリアできれば特に難しいことはないのですが、いざやってみるとハマります。 素直でない読み込み方だと非常に遅かったりエラーになってしまうので試行錯誤してみました。
誰かの参考になれば嬉しいです。
-
ソースコード的にはHadoopの
FileInputFormat
のsetInputPaths
に依存しているようです。 ↩